When dealing with big data, some datasets will have a much higher frequent of "events" than others.

An example table could be a table that tracks each pageview, it's not uncommon for someone to visit a site at the same time as someone else, espically a very popular site such as google.

I will illustrate how you can deal with these types of events, when you need to order by time.

Library Imports

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Window

Template

spark = (
    SparkSession.builder
    .master("local")
    .appName("Exploring Joins")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)

sc = spark.sparkContext

Option 1: Only ordering by date column

window = (
    Window
    .partitionBy('breed_id')
    .orderBy('birthday')
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
pets = spark.createDataFrame(
    [
        (1, 1, datetime(2018, 1, 1, 1 ,1, 1), 45),
        (2, 1, datetime(2018, 1, 1, 1 ,1, 1), 20),
    ], ['id', 'breed_id', 'birthday', 'age']
)

pets.withColumn('first_pet_of_breed', F.first('id').over(window)).toPandas()
id breed_id birthday age first_pet_of_breed
0 1 1 2018-01-01 01:01:01 45 1
1 2 1 2018-01-01 01:01:01 20 1
pets = spark.createDataFrame(
    [
        (2, 1, datetime(2018, 1, 1, 1 ,1, 1), 20),
        (1, 1, datetime(2018, 1, 1, 1 ,1, 1), 45),
    ], ['id', 'breed_id', 'birthday', 'age']
)

pets.withColumn('first_pet_of_breed', F.first('id').over(window)).toPandas()
id breed_id birthday age first_pet_of_breed
0 2 1 2018-01-01 01:01:01 20 2
1 1 1 2018-01-01 01:01:01 45 2

What Happened:

  • By changing the order of rows (this would happen with larger amounts of data stored on different partitions), we got a different value for "first" value.
  • datetimes can only be accurate to the second and if data is coming in faster than that, it is ambiguous to order by the date column.

Option 2: Order by date and id Column

Window Object

window_2 = (
    Window
    .partitionBy('breed_id')
    .orderBy('birthday', 'id')
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
pets = spark.createDataFrame(
    [
        (1, 1, datetime(2018, 1, 1, 1 ,1, 1), 45),
        (2, 1, datetime(2018, 1, 1, 1 ,1, 1), 20),
    ], ['id', 'breed_id', 'birthday', 'age']
)

pets.withColumn('first_pet_of_breed', F.first('id').over(window_2)).toPandas()
id breed_id birthday age first_pet_of_breed
0 1 1 2018-01-01 01:01:01 45 1
1 2 1 2018-01-01 01:01:01 20 1
pets = spark.createDataFrame(
    [
        (2, 1, datetime(2018, 1, 1, 1 ,1, 1), 20),
        (1, 1, datetime(2018, 1, 1, 1 ,1, 1), 45),
    ], ['id', 'breed_id', 'birthday', 'age']
)

pets.withColumn('first_pet_of_breed', F.first('id').over(window_2)).toPandas()
id breed_id birthday age first_pet_of_breed
0 1 1 2018-01-01 01:01:01 45 1
1 2 1 2018-01-01 01:01:01 20 1

What Happened:

  • We get the same "first" value in both incidents, which is what we expect.

TL;DR

In databases, the id (primary key) column of a table is usually monotonically increasing. Therefore if we are dealing with frequently arriving data we can additionally sort by id along the date column.

results matching ""

    No results matching ""